---
title: "RetryPolicy"
icon: "rotate-right"
description: ""
---

```python
class RetryPolicy
```

The `RetryPolicy` class defines how retries should be handled for failed atoms within a workflow. It provides customizable options for retrying operations, such as setting the number of attempts, delay between retries, and handling specific exceptions.

### Flexible Application:

- Can be applied at the **workflow level** (default for all atoms).
- Can be overridden by individual atoms with a custom retry policy.

## Parameters

### Optional Parameters (Defaults Provided):

- **`max_attempts`** _(int)_: Maximum number of retry attempts.  
  **Default**: `3`
- **`delay`** _(float)_: Initial delay in seconds between retry attempts.  
  **Default**: `1.0`
- **`backoff_factor`** _(float)_: Multiplier to increase delay between each failed attempt.  
  **Default**: `2.0`
- **`retry_exceptions`** _(tuple of exceptions)_: Specific exceptions on which retries are attempted. By default, retries are attempted on all exceptions (`Exception`).  
  **Default**: `(Exception,)`

## Functions

```python
def __init__(
    self,
    max_attempts: int = 3,
    delay: float = 1.0,
    backoff_factor: float = 2.0,
    retry_exceptions: tuple = (Exception,),
):
```

If no custom retry policy is provided, the defaults above are used.

```python
def should_retry(self, attempt: int, error: Exception) -> bool:
```

Determines if another retry attempt should be made.

```python
def get_delay(self, attempt: int) -> float:
```

Calculate the delay before the next retry attempt.

## Usage Examples

### 1. Applying a RetryPolicy to a Workflow

Define a retry policy for the entire workflow. This policy will be applied to all atoms unless overridden at the atom level.

```python
from preswald import Workflow, RetryPolicy

# Define a custom retry policy
custom_policy = RetryPolicy(
    max_attempts=3,
    delay=1.0,
    backoff_factor=2.0,
    retry_exceptions=(IOError, TimeoutError)
)

# Apply the policy to the workflow
workflow = Workflow(default_retry_policy=custom_policy)
```

### 2. Overriding a RetryPolicy for a Specific Atom

Atoms can have their own retry policy, which overrides the workflow's default policy.

```python
@workflow.atom(retry_policy=RetryPolicy(max_attempts=5, delay=0.5))
def fetch_data():
    # Simulate a network operation
    raise IOError("Simulated failure")
```

### 3. Combining Workflow and Atom Policies

If both workflow-level and atom-level policies are specified, the atom's policy takes precedence.

## Key Features

1. **Customizable Retry Logic**:

   - Adjust the number of attempts, delay, and backoff.
   - Target specific exceptions for retries.

2. **Exponential Backoff**:

   - Avoid excessive retry attempts by increasing delay incrementally with the `backoff_factor`.

3. **Granular Control**:
   - Use workflow-level defaults for simplicity.
   - Override defaults at the atom level for specific tasks.

## Example Workflow with RetryPolicy

```python
from preswald import Workflow, RetryPolicy

# Define a workflow-wide retry policy
workflow = Workflow(
    default_retry_policy=RetryPolicy(
        max_attempts=3,
        delay=1.0,
        backoff_factor=2.0,
        retry_exceptions=(IOError, TimeoutError)
    )
)

@workflow.atom()
def load_data():
    raise IOError("Simulated failure")

# Atom-specific retry policy
@workflow.atom(retry_policy=RetryPolicy(max_attempts=5, delay=0.5))
def fetch_data():
    raise TimeoutError("Another simulated failure")

# Execute the workflow
try:
    workflow.execute()
except Exception as e:
    print(f"Workflow execution failed: {e}")
```

## Why Use `RetryPolicy`?

- **Reliability**: Automatically handle transient failures with retries.
- **Efficiency**: Prevent infinite retry loops with customizable limits and backoff strategies.
- **Control**: Fine-tune retry behavior for specific workflows or tasks.

Enhance your workflow resilience with `RetryPolicy`! 🔄
